Python实现线程池之线程安全队列

您所在的位置:网站首页 python list 实现队列 Python实现线程池之线程安全队列

Python实现线程池之线程安全队列

2022-06-12 11:57| 来源: 网络整理| 查看: 265

目录 一、线程池组成 二、线程安全队列的实现 三、测试逻辑 3.1、测试阻塞逻辑 3.2、测试读写加锁逻辑

本文实例为大家分享了Python实现线程池之线程安全队列的具体代码,供大家参考,具体内容如下

一、线程池组成

一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。

二、线程安全队列的实现

包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。

class ThreadSafeQueue(object):     def __init__(self, max_size=0):         self.queue = []         self.max_size = max_size  # max_size为0表示无限大         self.lock = threading.Lock()  # 互斥量         self.condition = threading.Condition()  # 条件变量     def size(self):         """         获取当前队列的大小         :return: 队列长度         """         # 加锁         self.lock.acquire()         size = len(self.queue)         self.lock.release()         return size     def put(self, item):         """         将单个元素放入队列         :param item:         :return:         """         # 队列已满 max_size为0表示无限大         if self.max_size != 0 and self.size() >= self.max_size:             return ThreadSafeException()         # 加锁         self.lock.acquire()         self.queue.append(item)         self.lock.release()         self.condition.acquire()         # 通知等待读取的线程         self.condition.notify()         self.condition.release()         return item     def batch_put(self, item_list):         """         批量添加元素         :param item_list:         :return:         """         if not isinstance(item_list, list):             item_list = list(item_list)         res = [self.put(item) for item in item_list]         return res     def pop(self, block=False, timeout=0):         """         从队列头部取出元素         :param block: 是否阻塞线程         :param timeout: 等待时间         :return:         """         if self.size() == 0:             if block:                 self.condition.acquire()                 self.condition.wait(timeout)                 self.condition.release()             else:                 return None         # 加锁         self.lock.acquire()         item = None         if len(self.queue):             item = self.queue.pop()         self.lock.release()         return item     def get(self, index):         """         获取指定位置的元素         :param index:         :return:         """         if self.size() == 0 or index >= self.size():             return None         # 加锁         self.lock.acquire()         item = self.queue[index]         self.lock.release()         return item class ThreadSafeException(Exception):     pass

三、测试逻辑

3.1、测试阻塞逻辑 def thread_queue_test_1():     thread_queue = ThreadSafeQueue(10)     def producer():         while True:             thread_queue.put(random.randint(0, 10))             time.sleep(2)     def consumer():         while True:             print('current time before pop is %d' % time.time())             item = thread_queue.pop(block=True, timeout=3)             # item = thread_queue.get(2)             if item is not None:                 print('get value from queue is %s' % item)             else:                 print(item)             print('current time after pop is %d' % time.time())     t1 = threading.Thread(target=producer)     t2 = threading.Thread(target=consumer)     t1.start()     t2.start()     t1.join()     t2.join()

测试结果:

我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。

3.2、测试读写加锁逻辑 def thread_queue_test_2():     thread_queue = ThreadSafeQueue(10)     def producer():         while True:             thread_queue.put(random.randint(0, 10))             time.sleep(2)     def consumer(name):         while True:             item = thread_queue.pop(block=True, timeout=1)             # item = thread_queue.get(2)             if item is not None:                 print('%s get value from queue is %s' % (name, item))             else:                 print('%s get value from queue is None' % name)     t1 = threading.Thread(target=producer)     t2 = threading.Thread(target=consumer, args=('thread1',))     t3 = threading.Thread(target=consumer, args=('thread2',))     t1.start()     t2.start()     t3.start()     t1.join()     t2.join()     t3.join()

测试结果:

生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3